Rxjava操作符:第2篇 Transforming Observable

本文基于Rxjava 2.x版本,介绍用于变换 Observable 对象的操作符。

Operators that transform items that are emitted by an Observable.

  • Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time

    定期将Observable中的项目收集到包中并发出这些包,而不是一次发送一个项目

  • FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

    将Observable发出的项目转换为Observables,然后将这些项目的排放量变为单个Observable

  • GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key

    将一个Observable划分为一组Observable,每个Observable从原始Observable中发出一组不同的项目,按键组织

  • Map — transform the items emitted by an Observable by applying a function to each item

    通过将函数应用于每个项目来转换Observable发出的项目

  • Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value

    将函数应用于Observable发出的每个项目,按顺序发出每个连续的值

  • Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time

    定期将Observable中的项目细分为Observable窗口并发出这些窗口,而不是一次发出一个项目

buffer 操作符

buffer 操作符是缓存多个 emiter.onNext 事件,一次发出。buffer 方法有多个重载方法,可以仔细阅读源码注释。Buffer 方法的返回值是 Observable<List<T>> 类型。

1
2
3
4
5
6
7
8
Observable.range(0, 10)
.buffer(4)
.subscribe((List<Integer> buffer) -> System.out.println(buffer));

// prints:
// [0, 1, 2, 3]
// [4, 5, 6, 7]
// [8, 9]

cast 操作符

将被观察者传递的数据转换成指定类型。注意使用 fliter 操作符过滤非指定类型数据,以避免 Java 类型转换错误 ClassCastException

1
2
3
4
5
6
7
8
9
10
11
Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);

numbers.filter((Number x) -> Integer.class.isInstance(x))
.cast(Integer.class)
.subscribe((Integer x) -> System.out.println(x));

// prints:
// 1
// 7
// 12
// 5

concatMap 操作符

ConcatMap 操作符还有很多扩展方法,concatMapCompletable、 concatMapCompletableDelayError
、concatMapDelayError、 concatMapEager、
concatMapEagerDelayError、
concatMapIterable、 concatMapMaybe、
concatMapMaybeDelayError、 concatMapSingle
、concatMapSingleDelayError 等,这里主要介绍 concatMap 和 concatMapIterable 操作符。

ConcatMap 操作的是 Observable 对象,根据初始被观察者传递的数据转换成新的 Observable 向下传递。注意 ConcatMap 操作符是按照发射顺序执行的。

1
2
3
4
5
6
7
8
Observable.range(0, 5)
.concatMap(i -> {
long delay = Math.round(Math.random() * 2);
return Observable.timer(delay, TimeUnit.SECONDS).map(n -> i);
})
.blockingSubscribe(System.out::print);

// prints 01234

concatIterable 操作符

concatIterable 操作符会将转换生成的 Iterable 列表数据中的每一项单独从发射源发出,且发射源数据是按照顺序发出的。

1
2
3
4
5
Observable.just("A", "B", "C")
.concatMapIterable(item -> Arrays.asList(item,item,item))
.subscribe(System.out::println);

//print AAABBBCCC

flatMap 操作符

作用和 concatMap 操作符类似。flatMap 操作符与 concatMap 操作符区别:flatMap 操作符不是按照发射顺序执行。这种操作符主要是需要通过源 Observable 数据 获取新的 Observable 数据时使用。例如从某请求地址获取到参数,然后根据获取到的参数请求另一地址获取最终我们需要的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.just("A", "B", "C")
.flatMap(a -> {
return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.map(b -> '(' + a + ", " + b + ')');
})
.blockingSubscribe(System.out::println);

//打印顺序有可能不同
//(A, 1)
//(B, 1)
//(C, 1)
//(B, 2)
//(A, 2)
//(C, 2)
//(B, 3)
//(A, 3)
//(C, 3)

flatMapIterable 操作符

作用和 concatMapIterable 操作符相同,会将 Iterable 列表中的值生成 Observable<T> 发射源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.just(1, 2, 3, 4)
.flatMapIterable(x -> {
switch (x % 4) {
case 1:
return Arrays.asList("A");
case 2:
return Arrays.asList("B", "B");
case 3:
return Arrays.asList("C", "C", "C");
default:
return Arrays.asList();
}
})
.subscribe(System.out::print);
//print ABBCCC

groupBy 操作符

类似 butter 操作符打包多个发射项目,但是是根据 group(key,value) 来分包的。注意groupBy操作符返回值类型是 Observable<GroupedObservable<K, V>> ,发射源 onNext方法中传递的将会是 GroupedObservable<K, V> 类型,而我们需要的值在每个 GroupeObservable 中,这里我们使用 concatMapSingle 操作符将每个中的数据合并生成新的 Single<List<T>> 向下传递。

1
2
3
4
5
6
7
8
9
10
11
12
Observable<String> animals = Observable.just(
"Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");

animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
.concatMapSingle(Observable::toList)
.subscribe(System.out::println);

//println
//[TIGER, TURTLE]
//[ELEPHANT]
//[CAT, CHAMELEON]
//[FROG, FISH, FLAMINGO]

map 操作符

直接操作发射数据,返回变换后的数据并向下游传递。

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3)
.map(x -> x * x)
.subscribe(System.out::println);

// prints:
// 1
// 4
// 9

scan 操作符

scan操作符实现的是一种类似累加器的功能,如下所示 partialSum 参数为之前发射数据的和。

1
2
3
4
5
6
7
8
9
10
11
Observable.just(5, 3, 8, 1, 7)
.scan(0, (partialSum, x) -> partialSum + x)
.subscribe(System.out::println);

// prints:
// 0
// 5
// 8
// 16
// 17
// 24

window 操作符

window 操作符作用和 buffer 类似,不过 window 方法的返回值是 Observable<Observable<T>> 对象,直接传递的是Observable<T> 对象,在订阅方法中打印的是 Observable 对象,而不是其中传递的值。通过 flatMap 方法或者其扩展方法,既可处理按照 window(connt , skip) 设置打包数据的传递。而 buffer 方法的操作符返回值类型是 Observable<List<T>>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable.range(1, 10)

// Create windows containing at most 2 items, and skip 3 items before starting a new window.
.window(2, 3)
.flatMapSingle(window -> {
return window.map(String::valueOf)
.reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
})
.subscribe(System.out::println);

// prints:
// [1, 2] window 中 emitter.onNext(1); emitter.onNext(2);
// [4, 5]
// [7, 8]
// [10]

按照参数指定的数据打包为 Observable<T> 对象,参数 Observable 中会调用 onNext 方法来处理每一个集合数据。因为参数类型为 Observable<T> , 所有订阅方法中 onNext 方法只会调用4次。

参考文章:

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×